* signals waiters.
*/
pthread_t read_thr;
+ int read_thr_exists;
/*
* A list of fired watch messages, protected by a mutex. Users can
pthread_mutex_t request_mutex;
};
+static int read_message(struct xs_handle *h);
static void *read_thread(void *arg);
int xs_fileno(struct xs_handle *h)
int fd = -1, saved_errno;
if (stat(connect_to, &buf) != 0)
- goto error;
+ return NULL;
if (S_ISSOCK(buf.st_mode))
fd = get_socket(connect_to);
fd = get_dev(connect_to);
if (fd == -1)
- goto error;
+ return NULL;
h = malloc(sizeof(*h));
- if (h == NULL)
- goto error;
+ if (h == NULL) {
+ saved_errno = errno;
+ close(fd);
+ errno = saved_errno;
+ return NULL;
+ }
+
+ memset(h, 0, sizeof(*h));
h->fd = fd;
pthread_mutex_init(&h->request_mutex, NULL);
- if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0)
- goto error;
-
return h;
-
- error:
- saved_errno = errno;
- if (h != NULL)
- free(h);
- if (fd != -1)
- close(fd);
- errno = saved_errno;
- return NULL;
}
struct xs_handle *xs_daemon_open(void)
pthread_mutex_lock(&h->reply_mutex);
pthread_mutex_lock(&h->watch_mutex);
- /* XXX FIXME: May leak an unpublished message buffer. */
- pthread_cancel(h->read_thr);
- pthread_join(h->read_thr, NULL);
+ if (h->read_thr_exists) {
+ /* XXX FIXME: May leak an unpublished message buffer. */
+ pthread_cancel(h->read_thr);
+ pthread_join(h->read_thr, NULL);
+ }
list_for_each_entry_safe(msg, tmsg, &h->reply_list, list) {
free(msg->body);
struct xs_stored_msg *msg;
char *body;
+ /* Read from comms channel ourselves if there is no reader thread. */
+ if (!h->read_thr_exists && (read_message(h) == -1))
+ return NULL;
+
pthread_mutex_lock(&h->reply_mutex);
while (list_empty(&h->reply_list))
pthread_cond_wait(&h->reply_condvar, &h->reply_mutex);
{
struct iovec iov[2];
+ /* We dynamically create a reader thread on demand. */
+ pthread_mutex_lock(&h->request_mutex);
+ if (!h->read_thr_exists) {
+ if (pthread_create(&h->read_thr, NULL, read_thread, h) != 0) {
+ pthread_mutex_unlock(&h->request_mutex);
+ return false;
+ }
+ h->read_thr_exists = 1;
+ }
+ pthread_mutex_unlock(&h->request_mutex);
+
iov[0].iov_base = (void *)path;
iov[0].iov_len = strlen(path) + 1;
iov[1].iov_base = (void *)token;
ARRAY_SIZE(iov), NULL);
}
-static void *read_thread(void *arg)
+static int read_message(struct xs_handle *h)
{
- struct xs_handle *h = arg;
struct xs_stored_msg *msg = NULL;
char *body = NULL;
+ int saved_errno;
- for (;;) {
- msg = NULL;
- body = NULL;
-
- /* Allocate message structure and read the message header. */
- msg = malloc(sizeof(*msg));
- if (msg == NULL)
- goto error;
- if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
- goto error;
-
- /* Allocate and read the message body. */
- body = msg->body = malloc(msg->hdr.len + 1);
- if (body == NULL)
- goto error;
- if (!read_all(h->fd, body, msg->hdr.len))
- goto error;
- body[msg->hdr.len] = '\0';
-
- if (msg->hdr.type == XS_WATCH_EVENT) {
- pthread_mutex_lock(&h->watch_mutex);
+ /* Allocate message structure and read the message header. */
+ msg = malloc(sizeof(*msg));
+ if (msg == NULL)
+ goto error;
+ if (!read_all(h->fd, &msg->hdr, sizeof(msg->hdr)))
+ goto error;
- /* Kick users out of their select() loop. */
- if (list_empty(&h->watch_list) &&
- (h->watch_pipe[1] != -1))
- while (write(h->watch_pipe[1], body, 1) != 1)
- continue;
+ /* Allocate and read the message body. */
+ body = msg->body = malloc(msg->hdr.len + 1);
+ if (body == NULL)
+ goto error;
+ if (!read_all(h->fd, body, msg->hdr.len))
+ goto error;
+ body[msg->hdr.len] = '\0';
- list_add_tail(&msg->list, &h->watch_list);
- pthread_cond_signal(&h->watch_condvar);
+ if (msg->hdr.type == XS_WATCH_EVENT) {
+ pthread_mutex_lock(&h->watch_mutex);
- pthread_mutex_unlock(&h->watch_mutex);
- } else {
- pthread_mutex_lock(&h->reply_mutex);
+ /* Kick users out of their select() loop. */
+ if (list_empty(&h->watch_list) &&
+ (h->watch_pipe[1] != -1))
+ while (write(h->watch_pipe[1], body, 1) != 1)
+ continue;
- /* There should only ever be one response pending! */
- if (!list_empty(&h->reply_list)) {
- pthread_mutex_unlock(&h->reply_mutex);
- goto error;
- }
+ list_add_tail(&msg->list, &h->watch_list);
+ pthread_cond_signal(&h->watch_condvar);
- list_add_tail(&msg->list, &h->reply_list);
- pthread_cond_signal(&h->reply_condvar);
+ pthread_mutex_unlock(&h->watch_mutex);
+ } else {
+ pthread_mutex_lock(&h->reply_mutex);
+ /* There should only ever be one response pending! */
+ if (!list_empty(&h->reply_list)) {
pthread_mutex_unlock(&h->reply_mutex);
+ goto error;
}
+
+ list_add_tail(&msg->list, &h->reply_list);
+ pthread_cond_signal(&h->reply_condvar);
+
+ pthread_mutex_unlock(&h->reply_mutex);
}
+ return 0;
+
error:
- if (body != NULL)
- free(body);
- if (msg != NULL)
- free(msg);
+ saved_errno = errno;
+ free(msg);
+ free(body);
+ errno = saved_errno;
+ return -1;
+}
+
+static void *read_thread(void *arg)
+{
+ struct xs_handle *h = arg;
+
+ while (read_message(h) != -1)
+ continue;
+
return NULL;
}